tg-me.com/pythonl/4782
Last Update:
Вам нужно реализовать декоратор @thread_safe_cached
, который:
- Кэширует результат вызова функции по её аргументам (аналог functools.lru_cache
, но свой).
- Если несколько потоков одновременно вызывают функцию с одинаковыми аргументами:
- Только один поток реально выполняет функцию,
- Остальные ждут, пока результат будет вычислен, и получают готовый результат.
- Кэш никогда не удаляется (неограниченный размер).
Ограничения:
- Решение должно работать для произвольных функций и аргументов (hashable).
- Нельзя использовать готовый functools.lru_cache
или другие библиотеки кэширования.
- Нужно обеспечить корректную работу в многопоточной среде без гонок данных.
---
▪️ Подсказки:
- Для кэширования подойдёт dict
с ключами по аргументам (`*args`, `**kwargs`).
- Для защиты доступа к кэшу понадобится threading.Lock
.
- Для ожидания завершения вычисления другими потоками можно использовать threading.Event
.
- Продумайте carefully: как отличить "результат уже посчитан" от "результат в процессе вычисления".
---
▪️ Что оценивается:
- Умение работать с многопоточностью в Python.
- Правильная организация кэширования.
- Чистота и лаконичность кода.
- Умение обрабатывать тонкие случаи, например: одновременные вызовы.
---
▪️ Разбор возможного решения:
Основная идея:
- Создать кэш cache: Dict[Key, Result]
.
- Одновременно создать словарь "ожиданий" in_progress: Dict[Key, threading.Event]
.
- Если кто-то начал считать значение:
- Остальные ждут Event
, пока оно не будет установлено.
Пример реализации:
import threading
import functools
def thread_safe_cached(func):
cache = {}
in_progress = {}
lock = threading.Lock()
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = (args, frozenset(kwargs.items()))
with lock:
if key in cache:
return cache[key]
if key not in in_progress:
in_progress[key] = threading.Event()
in_progress[key].clear()
creator = True
else:
creator = False
if creator:
try:
result = func(*args, **kwargs)
with lock:
cache[key] = result
finally:
in_progress[key].set()
with lock:
del in_progress[key]
return result
else:
in_progress[key].wait()
with lock:
return cache[key]
return wrapper
---
▪️ Пояснения к коду:
- При первом вызове для новых аргументов поток создаёт
Event
и начинает считать результат.- Остальные потоки видят
Event
и вызывают wait()
, пока первый поток не установит set()
.- Как только результат посчитан,
Event
сигнализирует всем ждущим потокам, что данные готовы.- Доступ к
cache
и in_progress
защищён через lock
для избежания гонок.---
▪️ Возможные подводные камни:
- ❗ Если не удалять
Event
из in_progress
, кэш постепенно раздуется мусором.- ❗ Если ошибка случится внутри
func
, необходимо всё равно освободить Event
, иначе потоки будут вечно ждать.- ❗ Нельзя держать
lock
во время выполнения тяжёлой функции func
, иначе все потоки будут блокироваться.---
▪️ Вопросы на собеседовании по этой задаче:
- Как изменить реализацию, чтобы кэш имел ограничение по размеру (например, максимум 1000 элементов)?
- Как адаптировать декоратор под асинхронные функции (`async def`)?
- Что будет, если
func
иногда бросает исключения? Как кэшировать ошибки или не кэшировать их?- Как изменить реализацию так, чтобы кэш удалял устаревшие данные через TTL (Time-To-Live)?
---
@pythonl